package gu.simplemq;

import java.io.Closeable;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;

import gu.simplemq.pool.BaseMQPool;
import gu.simplemq.pool.NamedMQPools;
import static com.google.common.base.Preconditions.*;
import static gu.simplemq.utils.TypeConversionSupport.asProperties;

/**
 * {@link IMessageQueueFactory}实现基类
 * @author guyadong
 *
 */
@SuppressWarnings("rawtypes")
public abstract class BaseMessageQueueFactory<P extends BaseMQPool> implements IMessageQueueFactory,Constant {

	/**
	 * 原始的消息系统连接参数
	 */
	protected Map mqConnParams = Collections.emptyMap();
	private volatile HostAndPort hostAndPort;
	protected P pool;
	protected final IAdvisor advisor = new ZeroAdvisor();
	protected final MQPropertiesHelper propertiesHelper;
	/** 发布订阅模式的工厂实例 */
	protected IMessageQueueFactory pubsubFactory;
	protected volatile MQProperties props;
	protected BaseMessageQueueFactory(MQPropertiesHelper propertiesHelper) {
		this.propertiesHelper = propertiesHelper;
	}

	/**
	 * 执行工厂类初始化
	 * @param properties
	 */
	protected void doInit(Map properties) {
		mqConnParams = properties;
		props = propertiesHelper.with(MQLocationType.QUEUE).initParameters(properties);
		pool = getNamedMQPools().defineDefaultPool(asProperties(properties));
		/** 如果定义了 pub/sub （发布订阅）模式的 URI，则初始化 pubsubFactory */
		URI pubsub = propertiesHelper.with(MQLocationType.PUBSUB).getLocation(properties);
		if(pubsub != null){
			/** 
			 * 如果 URI scheme 不是当前实例支持的协议名，则查找对应的工厂实例，并初始化
			 * 否则忽略，由当前实例负责处理
			 */
			if(!getSupportedUriSchemes().contains(pubsub.getScheme())){
				pubsubFactory = MessageQueueFactorys.getFactoryByUriScheme(pubsub.getScheme());
				pubsubFactory.init(replacePrefix(properties, PUBSUB_PREFIX));
			}
		}
	}
	/**
	 * 返回消息系统服务器的主机名和端口
	 */
	protected HostAndPort doGetHostAndPort() {
		return propertiesHelper.with(MQLocationType.QUEUE).getHostAndPort(props);
	}
	/**
	 * @since 2.4.0
	 */
	protected abstract NamedMQPools<P> getNamedMQPools();
	protected P getPool() {
		return getNamedMQPools().getDefaultPool();
	}
	protected abstract MQInstanceSupplier<P> getMqInstanceFactory();
	@Override
	public boolean initialized() {
		return pool != null;
	}

	@Override
	public BaseMessageQueueFactory init(Map properties) {
		// double check
		if(!initialized()){
			synchronized (this) {
				if(!initialized()){
					doInit(properties);
				}
			}
		}
		return this;
	}
	
	@Override
	public BaseMessageQueueFactory init(IMQConnParameterSupplier supplier) {
		return init(checkNotNull(supplier,"supplier is null").getMQConnParameters());
	}
	@Override
	public BaseMessageQueueFactory init(String json) {
		return init(MessageQueueFactorys.asMQConnParam(json));
	}

	@Override
	public BaseMessageQueueFactory checkInitialized(){
		checkState(initialized(),"current instance is uninitizlied");
		return this;
	}
	@Override
	public BaseMessageQueueFactory asDefaultFactory(){
		MessageQueueFactorys.setDefaultFactory(this);
		return this;
	}
	@Override
	public void definePool(String name, Map props){
		MQProperties _props = propertiesHelper.with(MQLocationType.QUEUE).initParameters(props);
		getNamedMQPools().define(name, asProperties(_props), true);
		URI pubsub = propertiesHelper.with(MQLocationType.PUBSUB).getLocation(props);
		if (pubsub != null) {
			/** 如果 URI scheme 不是当前实例支持的协议名，在 pubsubFactory 中定义 */
			if(!getSupportedUriSchemes().contains(pubsub.getScheme())){
				checkNotNull(pubsubFactory,"pubsubFactory is uninitialized").definePool(name, replacePrefix(props, PUBSUB_PREFIX));
			}
		}
	}
	@Override
	public ISubscriber getSubscriber(String name) {
		if(pubsubFactory != null){
			return pubsubFactory.getSubscriber(name);
		}
		return getMqInstanceFactory().getSubscriber(getNamedMQPools().getPool(name));
	}
	@Override
	public IPublisher getPublisher(String name) {
		if(pubsubFactory != null){
			return pubsubFactory.getPublisher(name);
		}
		return getMqInstanceFactory().getPublisher(getNamedMQPools().getPool(name));
	}

	@Override
	public IConsumer getConsumer(String name) {
		return getMqInstanceFactory().getConsumer(getNamedMQPools().getPool(name));
	}

	@Override
	public IProducer getProducer(String name) {
		return getMqInstanceFactory().getProducer(getNamedMQPools().getPool(name));
	}

	@Override
	public ISubscriber getSubscriber() {
		if(pubsubFactory != null){
			return pubsubFactory.getSubscriber();
		}
		return getMqInstanceFactory().getSubscriber(getPool());
	}

	@Override
	public IPublisher getPublisher() {
		if(pubsubFactory != null){
			return pubsubFactory.getPublisher();
		}
		return getMqInstanceFactory().getPublisher(getPool());
	}

	@Override
	public IProducer getProducer() {
		return getMqInstanceFactory().getProducer(getPool());
	}

	@Override
	public IConsumer getConsumer() {
		return getMqInstanceFactory().getConsumer(getPool());
	}
	@Override
	public HostAndPort getHostAndPort() {
		// double check
		if(hostAndPort == null){
			synchronized(this){
				if(hostAndPort == null){
					hostAndPort = doGetHostAndPort();
				}
			}
		}
		return hostAndPort;
	}

	@SuppressWarnings({ "unchecked", "resource" })
	@Override
	public Map<String, Object> getMQConnParameters(){
		return Maps.newHashMap(checkInitialized().mqConnParams);
	}

	@Override
	public boolean testConnect() {
		return testConnect(mqConnParams, null);
	}

	@Override
	public MessageQueueType getImplType() {
		return propertiesHelper.getConstProvider().getImplType();
	}

	@Override
	public boolean testConnect(Map props, Long timeoutMills) {
		return getNamedMQPools().testConnect(props, timeoutMills);
	}
	@Override
	public BaseMessageQueueFactory setAdvisor(IAdvisor advisor){
		return this;
	}
	public IAdvisor getAdvisor(){
		return advisor;
	}
	@Override
	public String getProtocol() {
		return propertiesHelper.getConstProvider().getProtocol();
	}

	@Override
	public Set<String> getSupportedUriSchemes() {
		return Collections.unmodifiableSet(propertiesHelper.getConstProvider().getNativeSchemeMap().keySet());
	}
	/**
	 * 将Map中具有指定前缀的key替换为不带前缀的key，并返回新的Map<br>
	 * 例如：将 "pubsub.redis.host" 替换为 "redis.host",如果Map中已经存在"redis.host"则覆盖<br>
	 * 用于处理发布订阅模式的连接参数
	 * @param input
	 * @param prefix 前缀
	 * @return 替换后的Map
	 */
	@SuppressWarnings({ "unchecked" })
	protected Map replacePrefix(Map input, String prefix) {
		HashMap map = new HashMap<>(input);
		for(Object key:map.keySet().toArray(new Object[0])) {
			if(key instanceof String) {
				String name = (String)key;
				if(name.startsWith(prefix)) {
					Object value = map.remove(name);
					map.put(name.substring(prefix.length()),value);
				}
			}
		}
		return map;
	}

	@Override
	public synchronized void close() throws Exception {
		if(pubsubFactory instanceof AutoCloseable){
			((AutoCloseable)pubsubFactory).close();

		}else	if(pubsubFactory instanceof Closeable){
			((Closeable)pubsubFactory).close();
		}
		if(pool != null){
			pool.close();
			pool = null;
		}
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append(getClass().getSimpleName() + " [implementation type=");
		builder.append(getImplType());
		builder.append(",mqConnParams=");
		builder.append(mqConnParams);
		builder.append("]");
		return builder.toString();
	}

}
